Skip to content

fix(shuffle): fetch S3 partitions directly and stream IPC writes#43

Merged
phillipleblanc merged 1 commit into
spiceai-52.5from
phillip/shuffle-final-fetch-via-object-store
May 21, 2026
Merged

fix(shuffle): fetch S3 partitions directly and stream IPC writes#43
phillipleblanc merged 1 commit into
spiceai-52.5from
phillip/shuffle-final-fetch-via-object-store

Conversation

@phillipleblanc
Copy link
Copy Markdown

@phillipleblanc phillipleblanc commented May 20, 2026

Summary

Fixes two object-store shuffle failures that appeared after prefixed S3 shuffle writes started landing in the correct location.

First, BallistaClient::fetch_partition always used the executor gRPC FetchPartition path. That handler opens paths with tokio::fs::File::open, so final-stage result fetches failed when the partition location was an s3://... URL. This now detects S3 partition paths and reads them through the existing object-store shuffle reader instead.

Second, the object-store Arrow IPC writer serialized each batch as a complete IPC stream and concatenated those streams into one S3 object. Arrow readers stop at the first end-of-stream marker, so partitions with more than one batch could fail with Unexpected EOS. This now keeps one StreamWriter per output partition and emits the IPC header and end-of-stream marker only once.

Changes

  • Route S3 partition fetches in BallistaClient::fetch_partition directly through the object-store reader.
  • Share the object-store partition reader through shuffle_reader helpers used by both intermediate shuffle reads and final-stage fetches.
  • Replace per-batch IPC serialization with StreamingMultipartIpcUploader, which streams one valid Arrow IPC stream per shuffle partition.
  • Update no-repartition and hash-repartition object-store writer paths to use the new uploader.
  • Add an in-memory object-store regression test for multi-batch IPC uploads.

Test plan

  • cargo fmt --all -- --check
  • cargo clippy --all-targets --workspace --all-features -- -D warnings
  • cargo test -p ballista-core --lib shuffle_storage
  • Manual end-to-end test with a local 1 scheduler / 2 executor Spice cluster and shuffle_location: s3://<bucket>/<two-segment-prefix>:
    • SELECT COUNT(*) FROM t returned 2000000
    • SELECT col, COUNT(*) FROM t GROUP BY col ORDER BY ... LIMIT 5 returned 5 rows
  • GitHub CI

Copilot AI review requested due to automatic review settings May 20, 2026 02:52
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes object-store shuffle correctness by (1) routing final-stage partition fetches for object-store URLs around the gRPC/local-file fetch path, and (2) ensuring Arrow IPC data written to object store forms a single valid IPC stream per output partition (no per-batch EOS markers).

Changes:

  • Added shuffle_reader helpers to detect object-store URLs and stream partitions directly from object store, and used them from BallistaClient::fetch_partition.
  • Reworked object-store shuffle writers to maintain one IPC StreamWriter per output partition across the whole multipart upload (new StreamingMultipartIpcUploader), eliminating concatenated IPC streams.
  • Exposed shuffle_reader module internally and updated comments where the new routing behavior is relied upon.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
ballista/core/src/execution_plans/shuffle_writer.rs Writes Arrow IPC to object store as one stream per partition via multipart upload; removes per-batch IPC stream concatenation.
ballista/core/src/execution_plans/shuffle_reader.rs Adds reusable object-store streaming fetch helper and path_is_object_store scheme detection.
ballista/core/src/execution_plans/mod.rs Exposes shuffle_reader internally (pub(crate)) for cross-module reuse.
ballista/core/src/execution_plans/distributed_query.rs Documents reliance on BallistaClient::fetch_partition object-store routing for final-stage fetches.
ballista/core/src/client.rs Routes object-store URLs to the object-store streaming reader instead of gRPC/local-file fetch.
Comments suppressed due to low confidence (1)

ballista/core/src/execution_plans/shuffle_reader.rs:972

  • fetch_object_store_partition_stream relies on build_shuffle_object_store(&url), but that helper only constructs an S3 store (and explicitly errors on non-s3 schemes). Given this helper is now used outside the shuffle reader (driver-side fetch via BallistaClient::fetch_partition), it should either support the same object-store URL schemes as fetch_partition_object_store_inner (e.g. abfs://...) or clearly reject them before routing to avoid hard-to-diagnose failures.
    let url = Url::parse(path).map_err(|e| {
        BallistaError::General(format!(
            "Failed to parse object store URL '{path}': {e:?}"
        ))
    })?;

    let store = build_shuffle_object_store(&url).map_err(|e| {
        BallistaError::FetchFailed(
            executor_id.to_owned(),
            stage_id,
            partition_id,
            format!("Failed to build object store client for '{path}': {e:?}"),
        )
    })?;


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ballista/core/src/execution_plans/shuffle_reader.rs Outdated
Comment thread ballista/core/src/execution_plans/shuffle_writer.rs
Comment thread ballista/core/src/execution_plans/shuffle_writer.rs
@phillipleblanc phillipleblanc self-assigned this May 20, 2026
…eam per-partition IPC as a single Arrow stream

Two independent object-store shuffle correctness bugs that were latent until
PR #42 fixed the writer-side prefix bug and let intermediate shuffles actually
land in S3:

1. BallistaClient::fetch_partition was always going via the gRPC
   Action::FetchPartition, whose executor-side handler does
   `tokio::fs::File::open(path)` and only understands local paths and
   `memory://`. With object-store shuffle enabled, writers reported
   `location.path = s3://...` and the gRPC handler then failed with
   `Failed to open file: No such file or directory`. Dispatch on the path
   scheme (s3 / abfs / az / gs) and route those to the existing object-store
   reader, sharing it via two new pub(crate) helpers
   (path_is_object_store, fetch_object_store_partition_stream).

2. Both the no-repart and hash-repart Arrow IPC writer paths serialised
   each batch via serialize_batch_to_ipc_bytes — every call writes a
   complete IPC stream WITH a StreamWriter::finish() EOS marker — and
   concatenated those streams into one S3 object. StreamReader stops at the
   first EOS, so any partition holding more than one batch came back as
   `Arrow error: Ipc error: Unexpected EOS`. Replace with a long-lived
   StreamingMultipartIpcUploader that writes the header once on
   construction, appends each batch's bytes into the multipart upload, and
   emits the EOS marker exactly once on finish().

Verified end-to-end against a 1 scheduler + 2 executor local Spice cluster
with a multi-segment-prefix `shuffle_location: s3://bucket/<prefix>`:
- `SELECT COUNT(*) FROM t` (single-stage; exercises Bug A): succeeds.
- `SELECT col, COUNT(*) FROM t GROUP BY col ORDER BY ... LIMIT 5`
  (hash-repartition shuffle; exercises Bug B): succeeds.
@phillipleblanc phillipleblanc force-pushed the phillip/shuffle-final-fetch-via-object-store branch from 25ab388 to ad17b15 Compare May 20, 2026 03:14
@phillipleblanc
Copy link
Copy Markdown
Author

Addressed all three Copilot comments in ad17b153:

  1. path_is_object_store scope — narrowed to s3:// only with an explicit doc comment explaining why (build_shuffle_object_store only supports s3 today; Azure / GCS are tracked as a follow-up). Pre-existing check_is_object_store_location continues to list all four schemes since its broader-scheme reader (fetch_partition_object_store_inner) handles them.
  2. Capacity reuse on draindrain_buffer now does std::mem::replace(buf, Vec::with_capacity(buf.capacity())) so the cursor's allocation is reused across batches.
  3. Multi-batch round-trip test — added streaming_multipart_ipc_uploader_round_trips_multiple_batches that writes 5 record batches through StreamingMultipartIpcUploader to an object_store::memory::InMemory and reads them back with StreamReader, asserting all batches survive and no Unexpected EOS occurs. Test pulls the storage in via a new #[doc(hidden)] ObjectStoreShuffleStorage::new_for_test constructor that applies the same PrefixStore wrapping the production constructors do.

cargo test -p ballista-core --lib shuffle_writer ✓ (4 passed). cargo fmt --check ✓. cargo clippy --all-targets --workspace --all-features -- -D warnings ✓.

@phillipleblanc phillipleblanc changed the title fix(shuffle): route s3 partition fetches through object store and stream per-partition IPC as a single Arrow stream fix(shuffle): fetch S3 partitions directly and stream IPC writes May 21, 2026
@phillipleblanc phillipleblanc merged commit 07be66a into spiceai-52.5 May 21, 2026
30 checks passed
@phillipleblanc phillipleblanc deleted the phillip/shuffle-final-fetch-via-object-store branch May 21, 2026 06:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants